ReactiveXのマーブル図入門
ReactiveXのマーブル図
ReactiveX(以下Rx)ではデータフローをマーブル図(Marble Diagram)と呼ばれる方法で表すことが多いです。 例えばFilterの図は以下のようなものです。
FilterやMapなどはなんとなくわかりますが、もう少し複雑なオペレータになってくると図の意味もよくわかりませんでした。
しかし、それはマーブル図の読み方をよく理解していないためでした。
この記事ではマーブル図を読み解く手助けをできればと思います。
マーブル図の基本
まずはストリームについて確認していきます。 ストリームはイベント列で時間に沿ってイベントが流れてきます。 ストリームは以下のように表されます。
時間を表す矢印の上にイベントが置かれています。 矢印の左の方が時間が経過しています。そのため左側の方が古いイベントということになります。 イメージとしては左から矢印が伸びていって、各イベントに到達する感じです。
このイベントには3つの種類があります。
- Next: ○
- Error: X
- Completed: |
NextはObservableが流してくる一般的なイベントです。Errorはエラーが起きたときに流すイベントです。 Completeは全てのイベントを流し終わった後に流れてくるものです。
リストからストリームを作ってみます。 今回はPythonで説明します。
from rx import from_ from_([1, 2, 3])\ .subscribe( on_next=print, on_completed=lambda: print('Done!') )
マーブル図は次のような感じです。
リストの最後の要素がストリームに流れた後にCompletedが流れます。
$ python3 list.py 1 2 3 Done!
ストリームの変換
Rxではストリームを変換して別のストリームをつくります。 例えばFilterでは要素がフィルタリングされた別のストリームができます。 ここで大切なのは変換は元のストリームには基本的に影響がないということです。 つまりストリームはある意味でImmutableです。
ストリームの変換は次のように表します。
四角の部分が変換の処理で、下に新しいストリームができています。
ストリームの分岐
Rxにおける変換は新しいストリームを生成することになります。
自分はストリームは常に1つで、変換の関数が適用されると元のイベントは変わってしまうと考えていました。
結果としてマーブル図の理解が不十分な状態でした。(なんでストリームが2本あるんだ?)
以下は自分が考えていた変換の概念図です。
これはマーブル図として意味が不明なのであまり深く考えないでください。
しかし、Rxへの理解が進むにつれてモヤッとしていた部分がわかりました。 元のストリームに影響を与えない性質があることで次のようなコードを書くことができます。
100以下の2の倍数と4の倍数を数えるコードです。
from rx import range from rx import operators as op stream = range(100).pipe( op.publish() ) mod2 = stream.pipe( op.filter(lambda x: x % 2 == 0), op.count() ) mod4 = stream.pipe( op.filter(lambda x: x % 4 == 0), op.count() ) mod2.subscribe( on_next=lambda x: print(f'mod2: {x}') ) mod4.subscribe( on_next=lambda x: print(f'mod4: {x}') ) stream.connect()
元のストリームは一つですがそこからFilterとカウントを行っています。 別々のストリームが生成されるのでFilterしても問題はありません。
$ python3 mod.py mod2: 50 mod4: 25
2の倍数と4の倍数は重複してますが、問題なく計算できています。
まとめ
マーブル図が読めるようになるとRxのオペレータが直感的に理解しやすくなると思います。 今回は自分が読めるようになったコツをまとめました。